IPC机制: 进程之间的通信,指两个进程之间进行数据交换
1. JoinableQueue 队列 和 Queue 队列的区别
- 生产者生产完数据后,执行 p.join() 方法,意思是等待消费者把所有数据都处理完
- 消费者进程每次从 JoinableQueue 队列中获取值并且处理完后都要执行 q.task_done() 方法,告诉生产者进程这个数据处理完了
2. 使用 JoinableQueue 队列的注意事项
- 所有消费者进程必须设置为守护进程
- 主进程中所有生产者进程必须执行 .join() 方法, 等待生产者进程执行完毕
3.使用 JoinableQueue 队列的执行顺序:生产者生产的数据全部被消费完 -> 生产者进程结束 -> 主进程代码执行结束 -> 消费者守护进程结束
import time
import random
from multiprocessing import Process
from multiprocessing import JoinableQueue
# 生产者
def producer(q, food):
for i in range(5):
q.put('%s-%s' % (i, food))
q.join() # 等待 消费者进程 把所有的数据处理完
# 消费者
def consumer(q, name):
while True:
time.sleep(random.random())
food = q.get()
print('%s,吃了%s' % (name, food))
q.task_done() # 告诉生产者进程我处理完了这一个数据
if __name__ == '__main__':
q = JoinableQueue()
# 生产者进程
p1 = Process(target=producer, args=(q, '苹果'))
p1.start()
p2 = Process(target=producer, args=(q, '西瓜'))
p2.start()
# 消费者进程 -> 所有的消费者进程都要开启守护进程,当主进程的代码执行结束后,消费者进程也随之结束,里面的循环也不会被执行
c1 = Process(target=consumer, args=(q, 'Kevin'))
c1.daemon = True
c1.start()
c2 = Process(target=consumer, args=(q, 'Yeung'))
c2.daemon = True
c2.start()
c3 = Process(target=consumer, args=(q, 'Aimer'))
c3.daemon = True
c3.start()
# 主进程中所有的生产者进程都要执行 .join 方法,等待 xxx 生产者进程结束
p1.join() # 等待 p1 生产者进程结束
p2.join() # 等待 p2 生产者进程结束